package com.atguigu.flink.day08;

import com.atguigu.flink.beans.WaterSensor;
import com.atguigu.flink.func.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author Felix
 * @date 2023/12/9
 * 该案例演示了键控状态-AggregatingState
 * 需求：计算每种传感器的平均水位
 */
public class Flink05_KeyedState_AggregatingState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> wsDS = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        //按照传感器的id进行分组
        KeyedStream<WaterSensor, String> keyedDS = wsDS.keyBy(WaterSensor::getId);

        //对分组后的数据进行处理
        keyedDS.process(
            new KeyedProcessFunction<String, WaterSensor, String>() {

                private AggregatingState<Integer,Double> vcAvgState;

                @Override
                public void open(Configuration parameters) throws Exception {
                    AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double> aggregatingStateDescriptor = new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
                        "vcAvgState",
                        new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
                            @Override
                            public Tuple2<Integer, Integer> createAccumulator() {
                                return Tuple2.of(0,0);
                            }

                            @Override
                            public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                                return Tuple2.of(accumulator.f0 + value,accumulator.f1 + 1);
                            }

                            @Override
                            public Double getResult(Tuple2<Integer, Integer> accumulator) {
                                return accumulator.f0*1D/accumulator.f1;
                            }

                            @Override
                            public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                                return null;
                            }
                        },
                        Types.TUPLE(Types.INT,Types.INT)
                    );
                    vcAvgState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);
                }

                @Override
                public void processElement(WaterSensor ws, Context ctx, Collector<String> out) throws Exception {
                    Integer vc = ws.vc;
                    vcAvgState.add(vc);
                    out.collect("传感器id：" + ctx.getCurrentKey() +",平均水位：" +  vcAvgState.get());
                }
            }
        ).print();

        env.execute();
    }
}
